#ifndef __MD_PROTO__
#define __MD_PROTO__

#include "fs_proto.h"
#include "fileinfo.h"
#include "net_proto.h"
#include "lease.h"
#include "lich_md.h"
#include "../replica/replica.h"
#include "net_global.h"

inline static int __md_proto_chunk_has(const reploc_t *dist, int count, reploc_t *nid)
{
        int i;

        for (i = 0; i < count; ++i) {
                if (nid_cmp(&nid->id, &dist[i].id) == 0) {
                        *nid = dist[i];
                        return 1;
                }
        }

        return 0;
}

inline static int __md_proto_chunk_replace(chkinfo_t *chkinfo,
                                           const reploc_t *dist, int count, const nid_t *oper)
{
        int ret, i;
        reploc_t _dist[LICH_REPLICA_MAX + 1];

        if (nid_cmp(&chkinfo->diskid[0].id, &dist[0].id)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        if (nid_cmp(&chkinfo->diskid[0].id, oper)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        memcpy(_dist, dist, sizeof(*dist) * count);
        for (i = 0; i < count; ++i) {
                if (__md_proto_chunk_has(chkinfo->diskid, chkinfo->repnum, &_dist[i]) == 0) {
                        continue;
                }
        }

        memcpy(chkinfo->diskid, _dist, sizeof(*dist) * count);
        chkinfo->repnum = count;
        chkinfo->info_version++;

        return 0;
err_ret:
        return ret;
}

inline static int __md_proto_chunk_force(chkinfo_t *chkinfo,
                                           const reploc_t *dist, int count)
{
        memcpy(chkinfo->diskid, dist, sizeof(*dist) * count);
        chkinfo->repnum = count;
        chkinfo->info_version++;

        return 0;
}

static inline int md_proto_setattr(fileinfo_t *fileinfo, const setattr_t *setattr, const chkid_t *id)
{
        if (id) {
                fileinfo->id = *id;
                fileinfo->repnum_sys = gloconf.metadata_replica;
        }

        if (setattr) {
                if (setattr->mode.set_it) {
                        fileinfo->mode = setattr->mode.val & MODE_MAX;
                }

                if (setattr->uid.set_it) {
                        fileinfo->uid = setattr->uid.val;
                }

                if (setattr->gid.set_it) {
                        fileinfo->gid = setattr->gid.val;
                }

                if (setattr->size.set_it) {
                        if (setattr->size.size <= fileinfo->size) {
                                DERROR("cannot set "CHKID_FORMAT" size %ju to %ju\n",
                                       CHKID_ARG(&fileinfo->id), fileinfo->size, setattr->size.size);
                                /* TODO temporary for vmware set size from big to small */
                                return EPERM;
                        }

                        if (setattr->size.size > VOLUME_SIZE_MAX) {
                                DERROR("cannot set "CHKID_FORMAT" size %ju to %ju, volume size max: %lld Byte\n",
                                       CHKID_ARG(&fileinfo->id), fileinfo->size, setattr->size.size, VOLUME_SIZE_MAX);
                                return EFBIG;
                        }

                        DBUG("set "CHKID_FORMAT" size %ju\n", CHKID_ARG(&fileinfo->id), setattr->size.size);
#if LSV
                        // TODO logical size
                        if (setattr->size.set_it & SET_LOGICAL_SIZE) {
                                fileinfo->logical_size = setattr->size.size;
                                if (fileinfo->size < fileinfo->logical_size) {
                                        fileinfo->size = fileinfo->logical_size;
                                }
                        }

                        if (setattr->size.set_it & SET_PHYSICAL_SIZE) {
                                fileinfo->size = setattr->size.size;
                        }
#else
                        if (setattr->size.set_it) {
                                fileinfo->size = setattr->size.size;
                        }
#endif
                }

                if (setattr->atime.set_it) {
                        fileinfo->atime = setattr->atime.time.seconds;
                }

                if (setattr->mtime.set_it) {
                        fileinfo->mtime = setattr->mtime.time.seconds;
                }

                if (setattr->ctime.set_it) {
                        fileinfo->ctime = setattr->ctime.time.seconds;
                }

                if (setattr->btime.set_it) {
                        fileinfo->btime = setattr->btime.time.seconds;
                }

                /* must set ec before repnum, set repnum will be used ec */
                if (setattr->ec.set_it) {
                        fileinfo->ec = setattr->ec.ec;
                        if (fileinfo->id.type == __VOLUME_CHUNK__ &&
                                        fileinfo->ec.plugin != PLUGIN_NULL) {
                                fileinfo->repnum_usr = fileinfo->ec.m;
                        }
                }

                if (setattr->replica.set_it) {
                        if (setattr->replica.val < LICH_REPLICA_MIN) {
                            DERROR("cannot set "CHKID_FORMAT" repnum %d to %d\n",
                                  CHKID_ARG(&fileinfo->id), fileinfo->repnum_usr, setattr->replica.val);
                            return EPERM;
                        }

                        if (fileinfo->id.type == __VOLUME_CHUNK__ &&
                                        fileinfo->ec.plugin != PLUGIN_NULL) {
                                if (setattr->replica.val != fileinfo->ec.m) {
                                        DERROR("cannot set "CHKID_FORMAT" repnum %d to %d, volume set ec %u+%u\n",
                                                        CHKID_ARG(&fileinfo->id), fileinfo->repnum_usr,
                                                        setattr->replica.val, fileinfo->ec.k,
                                                        fileinfo->ec.m - fileinfo->ec.k);
                                        return EPERM;
                                }
                        }

                        fileinfo->repnum_usr = setattr->replica.val;
                }

                if (setattr->clone.set_it) {
                        if (setattr->clone.val) {
                                fileinfo->attr |= __FILE_ATTR_CLONE__;
                        } else {
                                fileinfo->attr &= ~__FILE_ATTR_CLONE__;
                        }
                }

                if (setattr->priority.set_it) {
                        fileinfo->priority = setattr->priority.val;
                }

                if (setattr->protect.set_it) {
                        if (setattr->protect.val) {
                                fileinfo->attr |= __FILE_ATTR_PROTECT__;
                        } else {
                                fileinfo->attr &= ~__FILE_ATTR_PROTECT__;
                        }
                }

                if (setattr->localize.set_it) {
                        if (setattr->localize.val) {
                                if (fileinfo->id.type != __VOLUME_CHUNK__ ||
                                                fileinfo->ec.plugin == PLUGIN_NULL)
                                        fileinfo->attr |= __FILE_ATTR_LOCALIZE__;
                        } else {
                                fileinfo->attr &= ~__FILE_ATTR_LOCALIZE__;
                        }
                }

                if (setattr->multpath.set_it) {
                        if (setattr->multpath.val) {
                                fileinfo->attr |= __FILE_ATTR_MULTPATH__ ;
                        } else {
                                fileinfo->attr &= ~__FILE_ATTR_MULTPATH__;
                        }
                }

                if (setattr->writeback.set_it) {
                        if (setattr->writeback.val) {
                                fileinfo->attr |= __FILE_ATTR_WRITEBACK__ ;
                        } else {
                                if (fileinfo->id.type == __VOLUME_CHUNK__
                                    && (fileinfo->attr |= __FILE_ATTR_WRITEBACK__)) {
                                        return EPERM;
                                }

                                fileinfo->attr &= ~__FILE_ATTR_WRITEBACK__;
                        }
                }

#if LSV
                if (setattr->lsv.set_it) {
                        // TODO attr lsv
                        if (setattr->lsv.val == VOLUME_FORMAT_ROW2) {
                                fileinfo->attr |= __FILE_ATTR_ROW2__ ;
                        } else if (setattr->lsv.val == VOLUME_FORMAT_ROW3) {
                                fileinfo->attr |= __FILE_ATTR_ROW3__ ;
                        } else if (setattr->lsv.val == VOLUME_FORMAT_LSV) {
                                fileinfo->attr |= __FILE_ATTR_LSV__ ;
                        } else {
                                fileinfo->attr &= ~__FILE_ATTR_LSV__;
                                fileinfo->attr &= ~__FILE_ATTR_ROW2__;
                                fileinfo->attr &= ~__FILE_ATTR_ROW3__;
                        }
                }

                if (setattr->lsv_clone.set_it) {
                        fileinfo->attr |= __FILE_ATTR_LSV_CLONE__ ;
                        YASSERT(setattr->lsv_clone.source > 0);
                        fileinfo->source = setattr->lsv_clone.source;
                        strcpy(fileinfo->snap, setattr->lsv_clone.snap);
                } else {
                        fileinfo->attr &= ~__FILE_ATTR_LSV_CLONE__;
                }
#endif
                if (setattr->deleting.set_it) {
                        if (setattr->deleting.val) {
                                fileinfo->attr |= __FILE_ATTR_DELETE__ ;
                        } else {
                                fileinfo->attr &= ~__FILE_ATTR_DELETE__;
                        }
                }

                if (setattr->readraw.set_it) {
                        if (setattr->readraw.val) {
                                fileinfo->attr |= __FILE_ATTR_READRAW__ ;
                        } else {
                                fileinfo->attr &= ~__FILE_ATTR_READRAW__;
                        }
                }

        }

        return 0;
}

inline static int __chkinfo_set_status(chkinfo_t *chkinfo, int idx, int status, int *seted)
{
        int ret, i, clean = 0;

        if (status != __S_CLEAN) {
                for (i = 0; i < (int)chkinfo->repnum; i++) {
                        if (idx == i)
                                continue;

                        if (chkinfo->diskid[i].status == __S_CLEAN) {
                                clean++;
                        }
                }

                if (!clean) {
                        ret  = EAGAIN;
                        GOTO(err_ret, ret);
                }
        }

        /* cannot set replica status from dirty to check, so we ignore */
        if (chkinfo->diskid[idx].status == __S_DIRTY && status == __S_CHECK) {
                DWARN("set chunk "CHKID_FORMAT" @ %s status from dirty to check\n", CHKID_ARG(&chkinfo->id), network_rname(&chkinfo->diskid[idx].id));
                if (seted)
                        *seted = 0;
                goto out;
        }

        chkinfo->diskid[idx].status = status;
        chkinfo->info_version++;
        if (seted)
                *seted = 1;

out:
        return 0;
err_ret:
        return ret;
}

inline static int chkinfo_set_status(chkinfo_t *chkinfo, int idx, int status)
{
        return __chkinfo_set_status(chkinfo, idx, status, NULL);
}

inline static void chkinfo_replace(chkinfo_t *chkinfo, int idx, const nid_t *nid)
{
        chkinfo->diskid[idx].id = *nid;
        chkinfo->info_version++;
}


inline static int __md_proto_migrate(chkinfo_t *chkinfo)
{
        int ret, i, idx;
        reploc_t reploc, *pos;

        //CHKINFO_DUMP(chkinfo, D_INFO);

        ret = network_connect(&chkinfo->diskid[0].id, NULL, 1, 0);
        if (unlikely(ret)) {
                if (ret == ENONET || ret == EAGAIN) {
                        //go on
                } else
                        GOTO(err_ret, ret);
        } else {
                //CHKINFO_DUMP(chkinfo, D_INFO);
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        idx = -1;
        for (i = 1; i < (int)chkinfo->repnum; ++i) {
                pos = &chkinfo->diskid[i];
                if (net_islocal(&pos->id)) {
                        if (pos->status) {
                                CHKINFO_DUMP(chkinfo, D_INFO);
                                ret = EPERM;
                                GOTO(err_ret, ret);
                        }

                        idx = i;
                        break;
                }
        }

        if (idx == -1) {
                //CHKINFO_DUMP(chkinfo, D_INFO);
                ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        YASSERT(net_islocal(&chkinfo->diskid[idx].id));
        reploc = chkinfo->diskid[0];
        chkinfo->diskid[0] = chkinfo->diskid[idx];
        chkinfo->diskid[idx] = reploc;
        chkinfo->info_version++;

        CHKINFO_DUMP(chkinfo, D_INFO);

        return 0;
err_ret:
        return ret;
}

inline static int md_proto_reject(chkinfo_t *chkinfo, const nid_t *nid)
{
        int ret, i, idx;
        reploc_t reploc, *pos;

        ret = network_connect(nid, NULL, 1, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        idx = -1;
        for (i = 0; i < (int)chkinfo->repnum; ++i) {
                pos = &chkinfo->diskid[i];
                if (pos->status || nid_cmp(&pos->id, nid) == 0)
                        continue;

                ret = network_connect(&pos->id, NULL, 1, 0);
                if (unlikely(ret))
                        continue;

                idx = i;
        }

        if (idx == -1) {
                CHKINFO_DUMP(chkinfo, D_INFO);
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        reploc = chkinfo->diskid[0];
        chkinfo->diskid[0] = chkinfo->diskid[idx];
        chkinfo->diskid[idx] = reploc;
        chkinfo->info_version++;

        return 0;
err_ret:
        return ret;
}

static inline int md_proto_chunk_cleanup1(const chkid_t *chkid, const nid_t *nid, uint64_t meta_version)
{
        int ret;

        SCHEDULE_LEASE_CHECK(err_ret, ret);

        YASSERT(!chkid_isnull(chkid));
        
        if (net_islocal(nid)) {
                replica_srv_unlink(chkid, meta_version);
        } else {
                ret = network_connect1(nid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = replica_rpc_unlink(nid, chkid, meta_version);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static inline int md_proto_chunk_cleanup(const chkinfo_t *chkinfo, const nid_t *nid, uint64_t meta_version)
{
        int ret, i;
        const chkid_t *chkid;

        SCHEDULE_LEASE_CHECK(err_ret, ret);

        chkid = &chkinfo->id;
        for (i = 0; i < chkinfo->repnum; i++) {
                if (nid_cmp(&chkinfo->diskid[i].id, nid) == 0) {
                        break;
                }
        }

        YASSERT(!chkid_isnull(&chkinfo->id));
        if (i == chkinfo->repnum) {
                //not found;
                if (net_islocal(nid)) {
                        replica_srv_unlink(chkid, meta_version);
                } else {
                        ret = network_connect1(nid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = replica_rpc_unlink(nid, chkid, meta_version);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }
        } else {
                DINFO("chunk "CHKID_FORMAT" @ %s still in use\n",
                      CHKID_ARG(chkid), network_rname(nid));
        }

        return 0;
err_ret:
        return ret;
}

static inline int md_proto_migrate(const char *pool, const fileid_t *parent, const chkid_t *chkid, chkinfo_t *_chkinfo)
{
        int ret, i;
        const nid_t *nid;
        lease_t lease;
        chkinfo_t *chkinfo, *newinfo;
        char buf[CHKINFO_MAX], _newinfo[CHKINFO_MAX];
        char tmp1[MAX_BUF_LEN], tmp2[MAX_BUF_LEN];

        chkinfo = (void *)buf;
        ret = md_chunk_getinfo1(pool, parent, chkid, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocal(&chkinfo->diskid[0].id)) {
                DINFO("chunk "CHKID_FORMAT" already migrated\n", CHKID_ARG(chkid));
                goto out;
        }
        
        for (i = 1; i < chkinfo->repnum; i++) {
                nid = &chkinfo->diskid[i].id;
                if (net_islocal(nid)) {
                        break;
                }
        }

        if (i == chkinfo->repnum) {
                DBUG("chunk "CHKID_FORMAT" moved\n", CHKID_ARG(chkid));
                ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        newinfo = (void *)_newinfo;
        CHKINFO_CP(newinfo, chkinfo);
        ret = __md_proto_migrate(newinfo);
        if (unlikely(ret)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        ret = lease_create(&lease, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = lease_set(&lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_chunk_update(pool, parent, newinfo, net_getnid(), chkinfo->info_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (newinfo->info_version != chkinfo->info_version) {
                CHKINFO_STR(chkinfo, tmp1);
                CHKINFO_STR(newinfo, tmp2);
                DINFO("%s  --> %s\n", tmp1, tmp2);
        }

        if (_chkinfo) {
                CHKINFO_CP(_chkinfo, newinfo);
        }
        
out:
        return 0;
err_ret:
        return ret;
}

#endif
